[Apache Hudi] Datadogと連携してメトリクスを表示する
データ・アナリティクス事業本部の森脇です。
Apache Hudiには0.8.0でメトリクス機能が追加されました。
この機能を使うことで、コミットやロールバックに関するデータを3rdツールと連携することが可能です。
連携先にはGraphite,JMX,Datadog等が利用可能です。
今回はDatadogと連携し、メトリクスがどのように表示されるか試してみました。
Glueジョブ
例によってGlueジョブを使ってApache Hudiを動かします。
Datadogの接続情報はMetrics configsに設定することで連携が可能です。
Agentのようなものをインストールする必要はなく、Apache Hudiのみで連携可能です。
import sys import random import boto3 import botocore.exceptions from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from pyspark.sql.session import SparkSession from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate() sc = spark.sparkContext glueContext = GlueContext(sc) job = Job(glueContext) job.init(args['JOB_NAME'], args) # クイックスタート用ライブラリを利用してデータ、及びデータフレームを作成 dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator() def genDF(): inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(random.randint(50, 100))) df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) return df def existsTable(databaseName, tableName): glue = boto3.client("glue") exists = False try: resp = glue.get_table( DatabaseName="default", Name="hudi_metrics_rt", ) except botocore.exceptions.ClientError as err: if err.response["Error"]["Code"] == "EntityNotFoundException": exists = False else: exists = True return exists databaseName = "default" tableName = 'hudi_metrics_sample' # テーブル名 bucketName = 'cm-moriwaki-hudi-sample--datalake' # 前述の手順で作成したHudiデータ格納用S3バケット basePath = f's3://{bucketName}/{tableName}' # Hudiのオプション hudi_options = { 'hoodie.table.name': tableName, # テーブル名 # 書き込みオプション 'hoodie.datasource.write.storage.type': 'MERGE_ON_READ', 'hoodie.compact.inline': 'false', 'hoodie.datasource.write.recordkey.field': 'uuid', # レコードキーのカラム名 'hoodie.datasource.write.partitionpath.field': 'partitionpath', # パーティション対象のカラム名 'hoodie.datasource.write.table.name': tableName, # テーブル名 'hoodie.datasource.write.operation': 'upsert', # 書き込み操作種別 'hoodie.datasource.write.precombine.field': 'ts', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される) 'hoodie.upsert.shuffle.parallelism': 2, # upsert時の並列数(今回はinsert操作なので多分不要だがクイックスタートに記載があるのでそのまま利用) 'hoodie.insert.shuffle.parallelism': 2, # insert時の並列数 # データカタログ連携オプション(hive_sync) 'hoodie.datasource.hive_sync.enable': 'true', # 連携を有効にする 'hoodie.datasource.hive_sync.database': databaseName, # 連携先のデータベース名 'hoodie.datasource.hive_sync.table': tableName, # 連携先のテーブル名 'hoodie.datasource.hive_sync.partition_fields': 'contient,country,city', # パーティションのフィールド名 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', # パーティションの方式を指定 'hoodie.datasource.hive_sync.use_jdbc': 'false', # jdbcを利用すると接続エラーになったのでfalseにする。 # メトリクスオプション(for Datadog) 'hoodie.metrics.on': 'true', # メトリクスをOn 'hoodie.metrics.reporter.type': 'DATADOG', # 連携対象はDatadog 'hoodie.metrics.datadog.report.period.seconds': '30', # レポート間隔 'hoodie.metrics.datadog.api.site': 'US', # Datadogサイトのリージョン(US or EU) 'hoodie.metrics.datadog.api.key': 'xxxxxxxxxxxxxxxxxxxxxxxxx', # APIキー 'hoodie.metrics.datadog.api.key.skip.validation': 'false', # trueの場合、データ送信時の事前Validationをskipする 'hoodie.metrics.datadog.metric.prefix': 'sample', # メトリクスのprefix 'hoodie.metrics.datadog.metric.host': 'apache_hudi.glue', # メトリクスのホスト } # テーブルが存在しない場合は新規作成、存在する場合は追記 mode = "overwrite" if existsTable(databaseName, tableName): mode = "append" # データの書き込み genDF().write.format("hudi"). \ options(**hudi_options). \ mode(mode). \ save(basePath) job.commit()
テーブルが存在しない場合は作成し、存在する場合はNレコードを追記するジョブを作成しました。
このジョブを複数回実行し、Datadogで結果を確認してみます。
メトリクスを確認してみる
DatadogのMetrics Explorerを見てみます。
Metricsがたくさん作成されています。
一覧:
sample.hudi_metrics_sample.HoodieWrapperFileSystem.listStatus sample.hudi_metrics_sample.HoodieWrapperFileSystem.listStatus.totalDuration sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.create sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.create.totalDuration sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.delete sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.delete.totalDuration sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.getFileStatus sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.getFileStatus.totalDuration sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.listStatus sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.listStatus.totalDuration sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.write sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.write.totalBytes sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.write.totalDuration sample.hudi_metrics_sample.TimelineService.LATEST_DATA_FILES_BEFORE_ON_INSTANT sample.hudi_metrics_sample.TimelineService.LATEST_PARTITION_DATA_FILE sample.hudi_metrics_sample.TimelineService.LATEST_SLICES_BEFORE_ON_INSTANT sample.hudi_metrics_sample.TimelineService.PEDING_COMPACTION_OPS sample.hudi_metrics_sample.TimelineService.TOTAL_API_CALLS sample.hudi_metrics_sample.TimelineService.TOTAL_API_TIME sample.hudi_metrics_sample.TimelineService.TOTAL_CHECK_TIME sample.hudi_metrics_sample.TimelineService.TOTAL_HANDLE_TIME sample.hudi_metrics_sample.TimelineService.TOTAL_REFRESH_TIME sample.hudi_metrics_sample.TimelineService.WRITE_VALUE_CNT sample.hudi_metrics_sample.TimelineService.WRITE_VALUE_TIME sample.hudi_metrics_sample.deltacommit.commitTime sample.hudi_metrics_sample.deltacommit.duration sample.hudi_metrics_sample.deltacommit.totalBytesWritten sample.hudi_metrics_sample.deltacommit.totalCompactedRecordsUpdated sample.hudi_metrics_sample.deltacommit.totalCreateTime sample.hudi_metrics_sample.deltacommit.totalFilesInsert sample.hudi_metrics_sample.deltacommit.totalFilesUpdate sample.hudi_metrics_sample.deltacommit.totalInsertRecordsWritten sample.hudi_metrics_sample.deltacommit.totalLogFilesCompacted sample.hudi_metrics_sample.deltacommit.totalLogFilesSize sample.hudi_metrics_sample.deltacommit.totalPartitionsWritten sample.hudi_metrics_sample.deltacommit.totalRecordsWritten sample.hudi_metrics_sample.deltacommit.totalScanTime sample.hudi_metrics_sample.deltacommit.totalUpdateRecordsWritten sample.hudi_metrics_sample.deltacommit.totalUpsertTime sample.hudi_metrics_sample.finalize.duration sample.hudi_metrics_sample.finalize.numFilesFinalized sample.hudi_metrics_sample.index.UPSERT.duration sample.hudi_metrics_sample.index.lookup.duration
試しにsample.hudi_metrics_sample.deltacommit.totalInsertRecordsWritten
を表示してみます。
コミット毎に追加(INSERT)されたレコード数の推移を確認することができました。
まとめ
Apache Hudiのメトリクス機能を利用することで、Datadogと連携することができました。
デバッグや性能監視に非常に便利そうだと感じました。
参考
※Apache®、Apache Hudi、Hudi、およびフレームロゴは、米国および、または他の国におけるApache Software Foundationの登録商標または商標です。これらのマークの使用は、Apache Software Foundationによる承認を意味するものではありません。